的确,对真正需要使用 Flink 访问 Hive 进行数据读写的读者会发现,Apache Flink 1.9.0 版本才开始提供与 Hive 集成的功能。 表数据 Flink 提供了 Hive Data Connector 来读写 Hive 的表数据。 Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。 集成 Hive 功能 Flink 与 Hive 集成的功能在 1.9.0 版本中作为试用功能发布,存在不少使用的局限性,但是不久将发布的 Flink 1.10 稳定版本会更加完善集成 Hive 的功能并应用到企业场景中 如果同时部署了 Hive 和 Flink,那么通过 HiveCatalog 能够使用 Hive Metastore 来管理 Flink 的元数据。
modules概念 通过hive module使用hive函数 内置函数 自定义函数 sql 客户端的使用 原理分析和源码解析 实现 modules概念 flink 提供了一个module的概念,使用户能扩展 flink内置了CoreModule,并且提供了一个hive module,允许用户在加载了hive module之后使用hive的函数,包括内置函数、自定义hive函数等等。 通过hive module使用hive函数 我们以hive module为例,讲解一下如何使用flink提供的module功能,使用hive module的一些注意事项: 通过 Hive Metastore ,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction).这样当我们就可以在flink中使用相应的hive函数了。 参考资料: [1].https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_functions.html
准备Hive 启动Hive服务 nohup $HIVE_HOME/bin/hiveserver2& 连接Hive服务 beeline -n hive -u jdbc:hive2://hadoop01:10000 --Hive JDBC--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> ; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Hive2Phoenix "); } } 读取Hive import com.alibaba.fastjson2.JSONObject; import com.xhkjedu.pojo.DBModel; import org.apache.flink.configuration.Configuration ; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection;
修改hive配置 案例讲解 引入相关的pom 构造hive catalog 创建hive表 将流数据插入hive, 遇到的坑 问题详解 修改方案 修改hive配置 上一篇介绍了使用sql将流式数据写入文件系统 ,这次我们来介绍下使用sql将文件写入hive,对于如果想写入已经存在的hive表,则至少需要添加以下两个属性. 引入相关的pom <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive 我基于社区的flink的tag release-1.11.0-rc4,我改了一下代码 将代码放到了github上。 https://github.com/zhangjun0x01/flink/tree/release-1.11.0-rc
--Hive JDBC--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> ; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Mysql2Hive 写入Hive package com.xhkjedu.mysql2hive; import com.alibaba.fastjson2.JSONObject; import org.apache.flink.configuration.Configuration ; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.types.Row; public "myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE
--Hive JDBC--> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> ; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class Hive2Hbase write data to hbase"); } } 读取Hive import com.alibaba.fastjson2.JSONObject; import org.apache.flink.configuration.Configuration ; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.sql.Connection "hive", "hive"); st = con.createStatement(); } @Override public void run(SourceContext
前言 本文使用环境版本 Hive:2.3.9 Flink:flink-1.12.7-bin-scala_2.12 使用代码连接到 Hive Hive 需要开启元数据服务 nohup hive --service 集成Hive只需要在Flink的lib中添加如下三个jar包 以Hive2.3.9为例,分别为: flink-sql-connector-hive-2.3.6_2.12-1.14.6.jar https ://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.12/1.14.6/ flink-connector-hive _2.12-1.12.7.jar https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.12/1.12.7/ hive-exec -2.3.9.jar Hive安装路径下的lib文件夹 Flink 支持以下 Hive 版本。
Apache Flink 从 1.9.0 版本开始增加了与 Hive 集成的功能,用户可以通过 Flink 来访问 Hive 的元数据,以及读写 Hive 中的表。 Flink on Hive 介绍 SQL 是大数据领域中的重要应用场景,为了完善 Flink 的生态,发掘 Flink 在批处理方面的潜力,我们决定增强 FlinkSQL 的功能,从而让用户能够通过 Flink Hive 的兼容,即 Flink 写入的数据 Hive 可以正常读取,并且反之亦然。 添加依赖 使用 Flink 与 Hive 集成的功能,用户首先需要添加相应的依赖。 Hive版本 所需依赖 2.3.4 flink-connector-hive_2.11flink-hadoop-compatibilityflink-shaded-hadoop-2-uber-2.7.5hive-exec
错误 今天在实验 Flink 连接 hive 的操作,由于 CDH 的 hadoop 是 HA,连接过程中报错如下: Exception in thread "main" java.lang.IllegalArgumentException at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92) 解决方式 1.配置 HADOOP_CONF_DIR 环境变量 2.flink-conf.yaml
Flink 1.11.0 hadoop-3.0.3, hive-2.3.4 现象 写入Hive表的性能,每秒写入记录数,发现性能并不乐观,上有节点背压严重。 ? 写入Hive表.png Hive Table DDL: CREATE TABLE dw_db.dw_xxx_rt( 中间几十个字段省略, `position` string COMMENT '位置' ) 写入HDFS文件.png HDFS文件的DDL: drop table hive_catalog.dw_db.dw_xxx_hdfs; CREATE TABLE hive_catalog.dw_db.dw_xxx_hdfs 的PR,十几天前,阿里Flink的开发同学已经注意到了这个问题,我们将之吸收到测试环境,编译替换lib下jar包,重新测试,性能确实up了,单并发升至5W每秒,上游节点才稍微有背压。 [FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory 所以,Flink的新特性从发布到应用线上,稳定性与性能上都不能过于乐观
/flink-sql-connector-hive-2.2.0_2.11/1.13.5/flink-sql-connector-hive-2.2.0_2.11-1.13.5.jar 如果你的Flink是其它版本 1.0.0 - 1.2.2 flink-sql-connector-hive-1.2.2 Download 2.0.0 - 2.2.0 flink-sql-connector-hive-2.2.0 Download 2.3.0 - 2.3.6 flink-sql-connector-hive-2.3.6 Download 3.0.0 - 3.1.2 flink-sql-connector-hive-3.1.2 Download 2048 -qu root.sparkstreaming -nm flink-cdc-hive 进入flink sql命令行 bin/sql-client.sh embedded -s flink-cdc-hive 表 创建hive需要指定SET table.sql-dialect=hive;,否则flink sql 命令行无法识别这个建表语法。
因此,Hive 表与 Flink SQL 有两种常见的用例: Lookup(查找)表用于丰富数据流 用于写入 Flink 结果的接收器 对于这些用例中的任何一个,还有两种方法可以使用 Hive 表。 将 Flink DDL 与 JDBC 连接器结合使用 使用 Flink JDBC 连接器,可以直接从控制台屏幕为任何 Hive 表创建 Flink 表,其中可以提供表的 Flink DDL 创建脚本。 Flink 能够缓存在 Hive 表中找到的数据以提高性能。需要设置 FOR SYSTEM_TIME AS OF 子句来告诉 Flink 与时态表连接。有关详细信息,请查看相关的 Flink 文档。 缺点:仅适用于非事务性表 使用 JDBC 连接器的 Flink DDL 表 使用带有 JDBC 连接器的 Hive 表时,默认情况下没有缓存,这意味着Flink 会为每个需要丰富的条目连接 Hive! 这也适用于更新插入流以及事务性 Hive 表。 结论 我们已经介绍了如何使用 SSB 通过 Hive 表丰富 Flink 中的数据流,以及如何使用 Hive 表作为 Flink 结果的接收器。
一、准备环境 1.根据产品文档安装Flink客户端; 2.将sql-client-defaults.yaml放入/opt/client/Flink/flink/conf中 3.将jaas.conf放入/ opt/client/Flink/flink/conf中 Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab /flink/conf/jaas.conf $JVM_ARGS" 二、启动Flink集群 例如:yarn-session.sh -t ssl -d 三、启动SQL-Client . /Hive/config hive-version: 3.1.0 2)在/opt/clienrc5/Hive/config/hive-site.xml添加配置 <property> <name> ; SET table.sql-dialect=hive; CREATE TABLE IF NOT EXISTS hive_dialect_tbl ( `id` int , `name` string
本文介绍了 SmartNews 利用 Flink 加速 Hive 日表的生产,将 Flink 无缝地集成到以 Airflow 和 Hive 为主的批处理系统的实践。 团队对 Flink 有比较好的背景,加上 Flink 近期对 Hive 的改进较多,因此决定采用基于 Flink 的方案。 技术挑战 挑战是多方面的。 输出 RC 文件格式 当前 Hive 表的文件格式为 RCFile,为了保证对用户的透明,我们只能在现有的 Hive 表上做 in-place 的 upgrade,也就是我们得重用当前表,那么 Flink 但对于 Flink 作业来说,没有结束的信号,它只能往 Hive 里面提交一个个的 partition,如 dt=2021-05-29/action=refresh。 将来我们将利用同样的技术,去加速更多其他的 Hive 表的生产,并且广泛提供更细粒度 hive 表示的生产,例如小时级。
分区提交策略 总结 前言 前段时间我们讲解了flink1.11中如何将流式数据写入文件系统和hive [flink 1.11 使用sql将流式数据写入hive],今天我们来从源码的角度深入分析一下。 以便朋友们对flink流式数据写入hive有一个深入的了解,以及在出现问题的时候知道该怎么调试。 数据流处理 我们这次主要是分析flink如何将类似kafka的流式数据写入到hive表,我们先来一段简单的代码: //构造hive catalog String name = "myhive"; 具体的写入ORC格式的数据,可以参考下这个文章: flink 1.11 流式数据ORC格式写入file ,由于我们这次主要是讲整体写入hive的流程,这个sink就不做太具体的讲解了。 总结 通过上述的描述,我们简单聊了一下flink是如何将流式数据写入hive的,但是可能每个人在做的过程中还是会遇到各种各种的环境问题导致的写入失败,比如window和linux系统的差异,hdfs版本的差异
自定义函数 import org.apache.flink.table.functions.ScalarFunction; // 定义函数逻辑 public class SubstringFunction
Flink使用HiveCatalog可以通过批或者流的方式来处理Hive中的表。 本文将以Flink1.12为例,介绍Flink集成Hive的另外一个非常重要的方面——Hive维表JOIN(Temporal Table Join)与Flink读写Hive表的方式。 Flink写入Hive表 Flink支持以批处理(Batch)和流处理(Streaming)的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入作业结束时,才可以看到写入的数据。 下面的示例是将kafka的数据流式写入Hive的分区表 -- 使用流处理模式 Flink SQL> set execution.type=streaming; -- 使用Hive方言 Flink SQL Flink读取Hive表 Flink支持以批处理(Batch)和流处理(Streaming)的方式读取Hive中的表。
使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务,主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务,当然也有PySpark、打 但是很少有听说过Hive On Flink【虽然翻Hive的源码好像可以去实现它】。 所以本文重点就是这个Hive On Flink。用流批一体的运算引擎去跑批也是个有趣的事情。 Hive On Flink原理 新增的支持 Hive任务能使用Flink来跑,Flink当然是做了很多支持: Hive的MetaStore在大数据领域的地位相当于K8S在云原生容器编排领域的地位,或者 能解析Hive的Metastore就可以管理Hadoop集群绝大多数的Hive表了。当然Hudi的一些表、Flink的一些SQL流式表也可能被管控到。 总结 从Flink1.16.0开始,就可以使用Hive On Flink了,SQL Boy们可以依旧只关心所谓的逻辑,只写几个Join。
flink 扩展支持用户自定义的 hive udf:主要介绍 flink sql 流任务中,不能使用 create temporary function 去引入一个用户自定义的 hive udf。 实时数据使用 flink 产出,离线数据使用 hive\spark 产出。 那么回到我们文章标题的问题:为什么需要 flink 支持 hive udf 呢? 因此在 flink 中支持 hive udf 这件事对开发人员提效来说是非常有好处的。 3.在扩展前,你需要知道一些基本概念 flink 支持 hive udf 这件事分为两个部分。 flink 扩展支持 hive 内置 udf flink 扩展支持用户自定义 hive udf 第一部分:flink 扩展支持 hive 内置 udf,比如 get_json_object,rlike 4.2.flink 扩展支持 hive 内置 udf 步骤如下: 引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。
版本:hive-2.3.4Hadoop版本:hadoop-2.7.3flink: flink-1.10.0scala:scala-2.11kafka:kafka_2.11-2.3.0 有关java 、hive、hadoop的安装之前写过了: Hive源码系列(一)hive2.1.1+hadoop2.7.3环境搭建 下面准备一下flink,scala,kafka环境 1.1 scala安装 下载 ## slavesvim $FLINK_HOME/conf/slaves ##配置从节点ip ##写入dataming 以上 flink单例模型配置完毕 2、SQL Client与hive集成配置 2.1 配制yaml文件 cp $FLINK_HOME/conf/sql-client-defaults.yaml sql-client-hive.yamlvim $FLINK_HOME/conf/sql-client-hive.yaml 此时在hive中也能看到用flink sql client 新创建的表啦: ? 3.3 写数据 此时,用kafka生产端写入几条数据,可以从flink端查到了: ? ?